package io.openmessaging.demo;

import io.openmessaging.BytesMessage;

import java.io.File;
import java.io.FilenameFilter;
import java.io.RandomAccessFile;
import java.nio.BufferUnderflowException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;

/**
 * Author :  Rocky
 * Date : 26/05/2017 17:53
 * Description :
 * Test :
 */
public class ReadUtils {

    //每个消息文件的物理名字前缀
    private static final String MSG_FILE_PRIEFIX = "msg.";

    private File storeDirFile;

    private Iterator<File> waitReadingFiles;

    private MappedByteBuffer currMbb;

    public ReadUtils(String storeDirPath, HashSet<String> waitReadingFileBaseNames) {
        storeDirFile = new File(storeDirPath);

        List<File> waitReadingFiles = new ArrayList<>();
        for (String waitReadingFileBaseName : waitReadingFileBaseNames) {
            List<File> files = listFile((MSG_FILE_PRIEFIX + waitReadingFileBaseName + "-").toLowerCase());
            waitReadingFiles.addAll(files);
        }

        this.waitReadingFiles = waitReadingFiles.iterator();
    }

    public BytesMessage pullMessage() {

        if (currMbb == null) {
            currMbb = getNextMbb();
            //说明读完了
            if (currMbb == null) {
                return null;
            }
        }

        //尝试读消息
        BytesMessage message = doReadMessage(currMbb);
        if (message != null) {
            return message;
        }

        //如果为空，则尝试读取下一个文件
        currMbb = getNextMbb();
        //说明读完了
        if (currMbb == null) {
            return null;
        }

        //第二次尝试读消息
        message = doReadMessage(currMbb);
        return message;
    }

    private BytesMessage doReadMessage(MappedByteBuffer currMbb) {
        try {
            return MessageUtils2.readMessage(currMbb);
        } catch (BufferUnderflowException e) {
            return null;
        }
    }

    private MappedByteBuffer getNextMbb() {
        if (!waitReadingFiles.hasNext()) {
            return null;
        }

        File file = waitReadingFiles.next();
        try {
            RandomAccessFile raf = new RandomAccessFile(file, "r");
            int endPosition = raf.readInt();
            MappedByteBuffer mbb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 4, endPosition - 4);
            raf.close();
            return mbb;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private List<File> listFile(String fileBaseName) {
        File[] files = storeDirFile.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                if (dir != storeDirFile) {
                    return false;
                }
                if (name.toLowerCase().startsWith(fileBaseName)) {
                    return true;
                }

                return false;
            }
        });
        List<File> result = Arrays.asList(files);


        Collections.sort(result, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                return Long.valueOf(o1.lastModified()).compareTo(o2.lastModified());
            }
        });
        return result;
    }
}
